package com.zwcl.common.mq;

import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.zwcl.common.core.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
 * @Title: RocketMQProductUtils
 * @Description: rocketMq 生产者工具类
 * @date : 2020/11/4 15:05
 */

@Slf4j
@Component
public class RocketMQProductUtils {

    public static String SEPARATOR_COLON = ":";

    @Autowired
    private RocketMQTemplate template;
    @Autowired
    private MessageUtils messageUtils;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    /**
     * 单向发送, 消息无应答
     * @param topic
     * @param message
     */
    public void sendOneWay(String topic, String message){
        template.sendOneWay(topic, messageUtils.buildMessage(message));
    }

    public void sendOneWay(String topic, BasePayload payload){
        template.sendOneWay(topic, messageUtils.buildMessage(payload));
    }

    public boolean syncSend(String topic, BasePayload payload){
        try {
            Message message = messageUtils.buildMessage(payload);
            log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
            SendResult sendResult = template.syncSend(topic, message,5000,0);
            return syncResultHandle(topic, JsonUtils.objectToJson(payload), sendResult);
        }catch (Exception e){
            log.error("failed to send message, destination:{},message:{},sendResult:{}", topic, JsonUtils.objectToJson(payload));
            return false;
        }
    }

    public boolean syncSend(String topic, String keys, BasePayload payload){
        try {
            Message message = messageUtils.buildMessage(payload, keys);
            log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
            SendResult sendResult = template.syncSend(topic, message,5000,0);
            return syncResultHandle(topic, JsonUtils.objectToJson(payload), sendResult);
        }catch (Exception e){
            log.error("failed to send message, destination:{},message:{},sendResult:{}", topic, JsonUtils.objectToJson(payload));
            return false;
        }
    }

    public boolean syncSend(String topic, String keys, String tags, BasePayload payload){
        try {
            Message message = messageUtils.buildMessage(payload, keys);
            log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
            SendResult sendResult = template.syncSend(topic + SEPARATOR_COLON + tags, message,5000,0);
            return syncResultHandle(topic, JsonUtils.objectToJson(payload), sendResult);
        }catch (Exception e){
            log.error("failed to send message, destination:{},message:{},sendResult:{}", topic, JsonUtils.objectToJson(payload));
            return false;
        }
    }

    public boolean syncSend(String topic, String keys, String tags, BasePayload payload,int delayLevel){
        try {
            Message message = messageUtils.buildMessage(payload, keys);
            log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
            SendResult sendResult = template.syncSend(topic + SEPARATOR_COLON + tags, message,5000,delayLevel);
            return syncResultHandle(topic, JsonUtils.objectToJson(payload), sendResult);
        }catch (Exception e){
            log.error("failed to send message, destination:{},message:{},sendResult:{}", topic, JsonUtils.objectToJson(payload));
            return false;
        }
    }

    public boolean syncSend(String topic, String keys, String tags, String msg){
        try {
            //TODO:这种方式，已经是Json结构，没有msgID
            Message message = messageUtils.buildMessage(msg, keys);
            log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
            SendResult sendResult = template.syncSend(topic + SEPARATOR_COLON + tags, message,5000,0);
            return syncResultHandle(topic, msg, sendResult);
        }catch (Exception e){
            log.error("failed to send message, destination:{},message:{},sendResult:{}", topic, msg);
            return false;
        }
    }

    public boolean syncSend(String topic, String keys, String tags, String msg,int delayLevel){
        try {
            //TODO:这种方式，已经是Json结构，没有msgID
            Message message = messageUtils.buildMessage(msg, keys);
            log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
            SendResult sendResult = template.syncSend(topic + SEPARATOR_COLON + tags, message,5000,delayLevel);
            return syncResultHandle(topic, msg, sendResult);
        }catch (Exception e){
            log.error("failed to send message, destination:{},message:{},sendResult:{}", topic, msg);
            return false;
        }
    }

    private boolean syncResultHandle(String topic, String message, SendResult sendResult){
        if (null == sendResult) {
            return false;
        }
        log.info("topic_" + topic + "  response received msg: " + JsonUtils.objectToJson(sendResult));
        if (sendResult.getSendStatus() == SendStatus.SEND_OK ) {
            return true;
        } else {
            log.error("failed to send message, destination:{},message:{},sendResult:{}", topic, message, sendResult);
            return false;
        }
    }

    public void asyncSend(String topic, BasePayload payload){
        Message message = messageUtils.buildMessage(payload);
        log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
        template.asyncSend(topic, message, getSendCallback(topic, payload));
    }

    public void asyncSend(String topic, String keys, BasePayload payload){
        Message message = messageUtils.buildMessage(payload, keys);
        log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
        template.asyncSend(topic, message, getSendCallback(topic, payload));
    }

    public void asyncSend(String topic, String keys, String tags, BasePayload payload){
        Message message = messageUtils.buildMessage(payload, keys);
        log.info("topic_" + topic + "  send msg: " + JsonUtils.objectToJson(message));
        template.asyncSend(topic + SEPARATOR_COLON + tags, messageUtils.buildMessage(payload, keys), getSendCallback(topic, payload));
    }

    private SendCallback getSendCallback(String topic, BasePayload payload){
        return new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("topic_" + topic + "  response received msg: " + JsonUtils.objectToJson(sendResult));
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                stringRedisTemplate.opsForHash().put(MqConstant.TOPIC_PREFIX + topic, payload.getBizId(), JsonUtils.objectToJson(payload));
            }
        };
    }

    public TransactionSendResult sendMessageInTransaction(String txProducerGroup, String topic, String keys, String tags, BasePayload payload, Object arg){
        String transactionId = IdWorker.get32UUID();
        Message message = messageUtils.buildTransactionMessage(payload, keys, transactionId);
        log.info("topic_" + topic + "  send transaction msg: " + JsonUtils.objectToJson(message));
        String destination = topic;
        if(StringUtils.isNotBlank(tags)){
            destination = topic + SEPARATOR_COLON + tags;
        }
        TransactionSendResult transactionSendResult = template.sendMessageInTransaction(txProducerGroup, destination, message, arg);
        log.info("topic_" + topic + "  response received localTransactionState={}, transactionId={}, msg: {}", transactionSendResult.getLocalTransactionState().toString(), transactionId, JsonUtils.objectToJson(transactionSendResult));
        return transactionSendResult;
    }

}